Airflow-провайдеры
В dataCraft Core есть два провайдера для Apache Airflow:
apache_airflow_providers_datacraft_airbyte
Пакет airflow_providers_datacraft_airbyte
позволяет вызывать различные методы API Airbyte как задачи в Airflow. Оператор AirbyteGeneralOperator
дает возможность выполнить произвольный вызов API. Остальные операторы вызывают конкретные методы, но при этом в отличие от API дают возможность работать не с ID, а с именами объектов. Например, для создания Airbyte Source требуется указать ID Airbyte Workspace. В операторе вместо этого можно указать его имя. Чтобы преобразовать имя в ID, потребуется передать дополнительный аргумент, который получается с помощью другого оператора (в данном случае AirbyteListWorkspaceOperator. Если имя уникальное, оператор сам подставит в вызов API нужный ID, если нет — вернет ошибку.
На момент написания, есть две версии API Airbyte:
По операторах по возможности используется официальное API, те части, которые там еще не реализованы, сделаны с помощью устаревшего API.
Провайдер содержит:
Список DAG’ов:
Название | Рещаемая задача |
---|---|
create_connections | Создаёт Airbyte Connection |
install_connectors | Устанавливает коннекторы в Airbyte на основе данных из конфига connectors. |
generate_models | Создает файлы моделей dbt, заполненные содержимым по умолчанию, т.е. вызовом макросов по каждому шагу методологии dataCraft Core |
template_configs | DAG используется для генерации конфигов типа templated_file |
sync_data | Синхронизирует соединения в Airbyte и запускает модели dbt |
Интеграция с dataCraft
apache_airflow_providers_datacraft_dags
Все DAG’и проекта dataCraft Core имеют общую структуру для того, чтобы пользователю легче было построить собственную логику на основе имеющихся “строительных блоков”.
Во всех DAG’ах есть параметр namespace
, который определяет:
get_configs(...)
определяются все конфиги, релевантные для него.Все шаги далее представляют собой задачу или группу задач в Airflow.
2. Шаг before
, на котором DAG выполняет подготовительную работу, например, создает необходимые папки (см. документацию по конкретному DAG’у).
3. Шаг prepare
, на котором DAG определяет список для итерации. Он определяет содержимое всех задач на следующем этапе, например, соединения Airbyte, для которых нужно включить синхронизацию. Пользователь шага prepare
и перед следующим шагом iterate
может добавить задачу, которая будет модифицировать этот список.
4. Шаг iterate
, на котором на каждый элемент списка для итерации создается динамическая задача или динамическая группа задач.
5. Шаг after
, на котором DAG очищает или освобождает выделенные ресурсы, например, удаляет созданные временные папки и файлы.
В среде с установленным Airflow выполнить:
pip install apache_airflow_datacraft_dags_provider
Данная команда установит оба пакета. Если нужен только функционал, связанных с Airbyte, то нужно установить пакет apache_airflow_datacraft_dags_provider
В Airflow создать DAG со следующим содержимым:
from apache_airflow_providers_datacraft import DagBuilder
DagBuilder.create_dags() # Создаем все DAGи с параметрами по умолчанию
Пример с кастомизацией:
from apache_airflow_providers_datacraft import DagBuilder
dag = DagBuilder.prepare_dag("generate_models")
dag.schedule_interval = "@weekly" # Изменяем расписание запуска на 1 раз в неделю
def delete_normalize(prepared_tasks):
del prepared_tasks['1_silos']['normalize']
dag.add_prepare_hook(delete_normalize) # Удаляем задачи, связанные со слоем normalize
apache_airflow_providers_datacraft_defaults
Позволяет получать значения конфигов в тех случаях, когда пользователь не задал никаких значений. Например, при первоначальной настройке Airbyte требуется установить коннекторы. За это отвечает DAG install_connectors, для которого требуется указать пути к образам коннекторов и их документации. Если пользователь этого не сделал, можно взять список коннекторов по умолчанию, который лежит в файле connectors.yaml
данного пакета. Его содержимое в виде словаря можно получить с помощью функции get_datacraft_defaults
.
Для обратной совместимости пакет предусматривает хранение старых версий конфигов. Они имеют такое название, но с суффиксом версии, например connectors_v1.1.yaml
. Этот суффикс передается при вызове get_datacraft_defaults
.
Иногда конфиг по умолчанию зависит от переменной, например, названия проекта. В этом случае в пакете этот конфиг содержится в виде шаблона Jinja, например:
...
entities:
{% if feature_has_metrika %}
- YandexClientId
{% endif %}
- AccountId
...
Переменные для подстановки в шаблон передаются как аргумент в get_datacraft_defaults
в виде словаря, например {"feature_has_metrika": true}
.
from apache_airflow_providers_datacraft_defaults import get_datacraft_defaults
get_datacraft_defaults('connectors', 'yaml', '')
Функция get_datacraft_defaults
принимает три аргумента:
config_name
— название конфига, значение по умолчанию для которого нужно найтиformat
— формат, json
или yaml
suffix
— суффикс для выбора нужной версии (по умолчанию ""
)template_variables
— словарь с переменными для подстановки в шаблон (по умолчанию пустой).